/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.gcp.bigquery;



import com.google.api.services.bigquery.model.TableFieldSchema;

import com.google.api.services.bigquery.model.TableRow;

import com.google.api.services.bigquery.model.TableSchema;

import org.apache.avro.Conversions;

import org.apache.avro.LogicalType;

import org.apache.avro.LogicalTypes;

import org.apache.avro.Schema;

import org.apache.avro.Schema.Field;

import org.apache.avro.Schema.Type;

import org.apache.avro.generic.GenericRecord;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableCollection;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableMultimap;

import com.bff.gaia.unified.vendor.guava.com.google.common.io.BaseEncoding;

import org.joda.time.format.DateTimeFormat;

import org.joda.time.format.DateTimeFormatter;



import javax.annotation.Nullable;

import java.math.BigDecimal;

import java.nio.ByteBuffer;

import java.time.LocalDate;

import java.time.LocalTime;

import java.time.format.DateTimeFormatterBuilder;

import java.util.ArrayList;

import java.util.List;



import static java.time.temporal.ChronoField.*;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects.firstNonNull;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Verify.verify;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Verify.verifyNotNull;



/**

 * A set of utilities for working with Avro files.

 *

 * <p>These utilities are based on the <a href="https://avro.apache.org/docs/1.8.1/spec.html">Avro

 * 1.8.1</a> specification.

 */

class BigQueryAvroUtils {



  /**

   * Defines the valid mapping between BigQuery types and native Avro types.

   *

   * <p>Some BigQuery types are duplicated here since slightly different Avro records are produced

   * when exporting data in Avro format and when reading data directly using the read API.

   */

  public static final ImmutableMultimap<String, Type> BIG_QUERY_TO_AVRO_TYPES =

      ImmutableMultimap.<String, Type>builder()

          .put("STRING", Type.STRING)

          .put("GEOGRAPHY", Type.STRING)

          .put("BYTES", Type.BYTES)

          .put("INTEGER", Type.LONG)

          .put("FLOAT", Type.DOUBLE)

          .put("NUMERIC", Type.BYTES)

          .put("BOOLEAN", Type.BOOLEAN)

          .put("TIMESTAMP", Type.LONG)

          .put("RECORD", Type.RECORD)

          .put("DATE", Type.STRING)

          .put("DATE", Type.INT)

          .put("DATETIME", Type.STRING)

          .put("TIME", Type.STRING)

          .put("TIME", Type.LONG)

          .build();



  /**

   * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and

   * immutable.

   */

  private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =

      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();



  @VisibleForTesting

  static String formatTimestamp(Long timestampMicro) {

    // timestampMicro is in "microseconds since epoch" format,

    // e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".

    // Separate into seconds and microseconds.

    long timestampSec = timestampMicro / 1_000_000;

    long micros = timestampMicro % 1_000_000;

    if (micros < 0) {

      micros += 1_000_000;

      timestampSec -= 1;

    }

    String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);



    if (micros == 0) {

      return String.format("%s UTC", dayAndTime);

    }

    return String.format("%s.%06d UTC", dayAndTime, micros);

  }



  /**

   * This method formats a BigQuery DATE value into a String matching the format used by JSON

   * export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic

   * Gregorian calendar.

   */

  private static String formatDate(int date) {

    return LocalDate.ofEpochDay(date).format(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE);

  }



  private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_MICROS =

      new DateTimeFormatterBuilder()

          .appendValue(HOUR_OF_DAY, 2)

          .appendLiteral(':')

          .appendValue(MINUTE_OF_HOUR, 2)

          .appendLiteral(':')

          .appendValue(SECOND_OF_MINUTE, 2)

          .appendLiteral('.')

          .appendFraction(NANO_OF_SECOND, 6, 6, false)

          .toFormatter();



  private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_MILLIS =

      new DateTimeFormatterBuilder()

          .appendValue(HOUR_OF_DAY, 2)

          .appendLiteral(':')

          .appendValue(MINUTE_OF_HOUR, 2)

          .appendLiteral(':')

          .appendValue(SECOND_OF_MINUTE, 2)

          .appendLiteral('.')

          .appendFraction(NANO_OF_SECOND, 3, 3, false)

          .toFormatter();



  private static final java.time.format.DateTimeFormatter ISO_LOCAL_TIME_FORMATTER_SECONDS =

      new DateTimeFormatterBuilder()

          .appendValue(HOUR_OF_DAY, 2)

          .appendLiteral(':')

          .appendValue(MINUTE_OF_HOUR, 2)

          .appendLiteral(':')

          .appendValue(SECOND_OF_MINUTE, 2)

          .toFormatter();



  /**

   * This method formats a BigQuery TIME value into a String matching the format used by JSON

   * export. Time records are stored in "microseconds since midnight" format.

   */

  private static String formatTime(long timeMicros) {

    java.time.format.DateTimeFormatter formatter;

    if (timeMicros % 1000000 == 0) {

      formatter = ISO_LOCAL_TIME_FORMATTER_SECONDS;

    } else if (timeMicros % 1000 == 0) {

      formatter = ISO_LOCAL_TIME_FORMATTER_MILLIS;

    } else {

      formatter = ISO_LOCAL_TIME_FORMATTER_MICROS;

    }

    return LocalTime.ofNanoOfDay(timeMicros * 1000).format(formatter);

  }



  /**

   * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.

   *

   * <p>See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config">"Avro

   * format"</a> for more information.

   */

  static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {

    return convertGenericRecordToTableRow(record, schema.getFields());

  }



  private static TableRow convertGenericRecordToTableRow(

      GenericRecord record, List<TableFieldSchema> fields) {

    TableRow row = new TableRow();

    for (TableFieldSchema subSchema : fields) {

      // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field

      // is required, so it may not be null.

      Field field = record.getSchema().getField(subSchema.getName());

      Object convertedValue =

          getTypedCellValue(field.schema(), subSchema, record.get(field.name()));

      if (convertedValue != null) {

        // To match the JSON files exported by BigQuery, do not include null values in the output.

        row.set(field.name(), convertedValue);

      }

    }



    return row;

  }



  @Nullable

  private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) {

    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field

    // is optional (and so it may be null), but defaults to "NULLABLE".

    String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");

    switch (mode) {

      case "REQUIRED":

        return convertRequiredField(schema.getType(), schema.getLogicalType(), fieldSchema, v);

      case "REPEATED":

        return convertRepeatedField(schema, fieldSchema, v);

      case "NULLABLE":

        return convertNullableField(schema, fieldSchema, v);

      default:

        throw new UnsupportedOperationException(

            "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());

    }

  }



  private static List<Object> convertRepeatedField(

      Schema schema, TableFieldSchema fieldSchema, Object v) {

    Type arrayType = schema.getType();

    verify(

        arrayType == Type.ARRAY,

        "BigQuery REPEATED field %s should be Avro ARRAY, not %s",

        fieldSchema.getName(),

        arrayType);

    // REPEATED fields are represented as Avro arrays.

    if (v == null) {

      // Handle the case of an empty repeated field.

      return new ArrayList<>();

    }

    @SuppressWarnings("unchecked")

    List<Object> elements = (List<Object>) v;

    ArrayList<Object> values = new ArrayList<>();

    Type elementType = schema.getElementType().getType();

    LogicalType elementLogicalType = schema.getElementType().getLogicalType();

    for (Object element : elements) {

      values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element));

    }

    return values;

  }



  private static Object convertRequiredField(

      Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) {

    // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery

    // INTEGER type maps to an Avro LONG type.

    checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());

    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field

    // is required, so it may not be null.

    String bqType = fieldSchema.getType();

    ImmutableCollection<Type> expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType);

    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);

    verify(

        expectedAvroTypes.contains(avroType),

        "Expected Avro schema types %s for BigQuery %s field %s, but received %s",

        expectedAvroTypes,

        bqType,

        fieldSchema.getName(),

        avroType);

    // For historical reasons, don't validate avroLogicalType except for with NUMERIC.

    // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type.

    switch (bqType) {

      case "STRING":

      case "DATETIME":

      case "GEOGRAPHY":

        // Avro will use a CharSequence to represent String objects, but it may not always use

        // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.

        verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());

        return v.toString();

      case "DATE":

        if (avroType == Type.INT) {

          verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());

          verifyNotNull(avroLogicalType, "Expected Date logical type");

          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type");

          return formatDate((Integer) v);

        } else {

          verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());

          return v.toString();

        }

      case "TIME":

        if (avroType == Type.LONG) {

          verify(v instanceof Long, "Expected Long, got %s", v.getClass());

          verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");

          verify(

              avroLogicalType instanceof LogicalTypes.TimeMicros,

              "Expected TimeMicros logical type");

          return formatTime((Long) v);

        } else {

          verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());

          return v.toString();

        }

      case "INTEGER":

        verify(v instanceof Long, "Expected Long, got %s", v.getClass());

        return ((Long) v).toString();

      case "FLOAT":

        verify(v instanceof Double, "Expected Double, got %s", v.getClass());

        return v;

      case "NUMERIC":

        // NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are

        // converted back to Strings with precision and scale determined by the logical type.

        verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());

        verifyNotNull(avroLogicalType, "Expected Decimal logical type");

        verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type");

        BigDecimal numericValue =

            new Conversions.DecimalConversion()

                .fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType);

        return numericValue.toString();

      case "BOOLEAN":

        verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());

        return v;

      case "TIMESTAMP":

        // TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch.

        // Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC.

        verify(v instanceof Long, "Expected Long, got %s", v.getClass());

        return formatTimestamp((Long) v);

      case "RECORD":

        verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());

        return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());

      case "BYTES":

        verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());

        ByteBuffer byteBuffer = (ByteBuffer) v;

        byte[] bytes = new byte[byteBuffer.limit()];

        byteBuffer.get(bytes);

        return BaseEncoding.base64().encode(bytes);

      default:

        throw new UnsupportedOperationException(

            String.format(

                "Unexpected BigQuery field schema type %s for field named %s",

                fieldSchema.getType(), fieldSchema.getName()));

    }

  }



  @Nullable

  private static Object convertNullableField(

      Schema avroSchema, TableFieldSchema fieldSchema, Object v) {

    // NULLABLE fields are represented as an Avro Union of the corresponding type and "null".

    verify(

        avroSchema.getType() == Type.UNION,

        "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",

        avroSchema.getType(),

        fieldSchema.getName());

    List<Schema> unionTypes = avroSchema.getTypes();

    verify(

        unionTypes.size() == 2,

        "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",

        fieldSchema.getName(),

        unionTypes);



    if (v == null) {

      return null;

    }



    Type firstType = unionTypes.get(0).getType();

    if (!firstType.equals(Type.NULL)) {

      return convertRequiredField(firstType, unionTypes.get(0).getLogicalType(), fieldSchema, v);

    }

    return convertRequiredField(

        unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(), fieldSchema, v);

  }



  static Schema toGenericAvroSchema(String schemaName, List<TableFieldSchema> fieldSchemas) {

    List<Field> avroFields = new ArrayList<>();

    for (TableFieldSchema bigQueryField : fieldSchemas) {

      avroFields.add(convertField(bigQueryField));

    }

    return Schema.createRecord(

        schemaName,

        "com.bff.gaia.unified.sdk.io.gcp.bigquery",

        "Translated Avro Schema for " + schemaName,

        false,

        avroFields);

  }



  private static Field convertField(TableFieldSchema bigQueryField) {

    Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()).iterator().next();

    Schema elementSchema;

    if (avroType == Type.RECORD) {

      elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields());

    } else {

      elementSchema = Schema.create(avroType);

    }

    Schema fieldSchema;

    if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) {

      fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema);

    } else if ("REQUIRED".equals(bigQueryField.getMode())) {

      fieldSchema = elementSchema;

    } else if ("REPEATED".equals(bigQueryField.getMode())) {

      fieldSchema = Schema.createArray(elementSchema);

    } else {

      throw new IllegalArgumentException(

          String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode()));

    }

    return new Field(

        bigQueryField.getName(),

        fieldSchema,

        bigQueryField.getDescription(),

        (Object) null /* Cast to avoid deprecated JsonNode constructor. */);

  }

}